Pipe 管道支持双向通信
Pipe 管道是通过 socket 实现的
在管道中数据是不安全的,因为管道中没有锁的机制,多个进程修改同一块数据时,就会导致数据的混乱,处理方式上锁
在管道中如果两个进程同时抢占一个资源的时候,在 Mac 和 Linux 就会报错,处理方式上锁
注意:没有被进程使用到的管道需要被关闭
1. Pipe管道的基本使用
from multiprocessing import Pipe
p1, p2 = Pipe() # 返回值:两个对象,可以理解为两条管道
p1.send('hello') # 管道p1发送了数据
print(p2.recv()) # hello 接收p1发送过来的数据,不需要传字节数
p2.send('world') # 管道p2发送了数据
p2.close() # 关闭 p2 管道
print(p1.recv()) # world 接收p2发送过来的数据,p2管道关闭了还可以接受到数据是因为数据是在关闭管道之前发送的,此时的数据已经存在管道里面了
print(p1.recv()) # EOFError 报错,当p2管道关闭了且里面没有任何数据那么就会报错,一般都会使用报错执行 try 来结束程序
2. 使用管道实现进程之间的通讯
from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock
def fun(son):
while True:
try:
print(son.recv()) # 接收主进程发送过来的数据
except EOFError:
son.close() # 关闭子进程的管道
break
if __name__ == '__main__':
foo, son = Pipe()
p = Process(target=fun, args=(son,)) # 将其中一个管道传递给子进程进行通信
p.start()
son.close() # 将主进程中没有用到的管道关闭掉
foo.send('hello')
foo.send('hello')
foo.send('hello')
foo.send('hello')
foo.send('hello')
foo.close() # 数据发送完后关闭主进程的管道
3. 使用管道实现生产者消费者模型
# 生产者消费者模型1
import time
from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock
# 生产者
def func1(foo, l):
for i in range(10):
foo.send(i) # 发送数据
foo.close() # 关闭管道
# 消费者
def func2(son, l):
while True:
try:
l.acquire() # 加锁
time.sleep(0.1)
print(son.recv()) # 接收生产者发送过来的数据
l.release() # 解锁
except EOFError:
l.release() # 要在这个地方进行解锁,因为上面没有解锁就报错了,如果不在这里解锁就会进行阻塞
son.close() # 关闭管道
break
if __name__ == '__main__':
foo, son = Pipe()
l = Lock()
# 生产者进程
f1 = Process(target=func1, args=(foo, l))
f1.start()
f2 = Process(target=func1, args=(foo, l))
f2.start()
f3 = Process(target=func1, args=(foo, l))
f3.start()
# 消费者进程
s1 = Process(target=func2, args=(son, l))
s1.start()
s2 = Process(target=func2, args=(son, l))
s2.start()
s3 = Process(target=func2, args=(son, l))
s3.start()
s4 = Process(target=func2, args=(son, l))
s4.start()
foo.close() # 关闭主进程没有使用到的管道
son.close() # 关闭主进程没有使用到的管道
# 生产者消费者模型2
import time
from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock
# 生产者
def producer(produce, n):
for i in range(n):
produce.send(i) # 发送数据
produce.send(None) # 发送数据
produce.send(None) # 发送数据
produce.close() # 关闭管道
# 消费者
def consumer(consume, name, lock):
while True:
lock.acquire() # 加锁
baozi = consume.recv() # 接收生产者发送过来的数据
lock.release() # 解锁
if baozi:
time.sleep(0.1)
print('%s 吃了包子:%s' % (name, baozi))
else:
consume.close() # 关闭管道
break
if __name__ == '__main__':
produce, consume = Pipe()
lock = Lock()
# 生产者进程
p1 = Process(target=producer, args=(produce, 10))
p1.start()
# 消费者进程
c1 = Process(target=consumer, args=(consume, 'Kevin', lock))
c2 = Process(target=consumer, args=(consume, 'Yeung', lock))
c1.start()
c2.start()
produce.close() # 关闭主进程没有使用到的管道
consume.close() # 关闭主进程没有使用到的管道
p1.join()
c1.join()
c2.join()
print('主进程')